package drds.plus.executor.data_node_executor.nonspi;

import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.model.ThreadLocalString;
import drds.plus.common.thread_local.ThreadLocalMap;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.ExecutorException;
import drds.plus.executor.IExecutor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.result_cursor.ResultCursor;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.MDC;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@Slf4j
public class DataNodeExecutor extends AbstractLifecycle implements IDataNodeExecutor {

    public Logger log() {
        return log;
    }

    public String dataNode = "localhost";

    public Future<ISortingCursor> executeWithFuture(final ExecuteContext executeContext, final ExecutePlan executePlan) throws RuntimeException {

        final Map<Object, Object> threadContext = ThreadLocalMap.getContextMap();
        final Map mdcContext = MDC.getCopyOfContextMap();
        ExecutorService executorService = executeContext.getExecutorService();
        if (executorService == null) {
            throw new ExecutorException("concurrentExecutors is null, cannot  parallelly");
        }

        Future<ISortingCursor> future = executorService.submit(new Callable<ISortingCursor>() {

            public ISortingCursor call() throws Exception {
                ThreadLocalMap.setContextMap(resetContext(threadContext));

                MDC.setContextMap(mdcContext);
                return execute(executeContext, executePlan);
            }
        });
        return future;
    }

    public ISortingCursor execute(ExecuteContext executeContext, ExecutePlan executePlan) throws RuntimeException {
        return getDataNodeExecutor(executeContext, executePlan).execute(executeContext, executePlan);
    }

    private IExecutor getDataNodeExecutor(ExecuteContext executeContext, ExecutePlan executePlan) {
        if (executeContext == null) {
            throw new ExecutorException("execution context is null");
        }

        String dataNodeId = executePlan.getDataNodeId();
        if (dataNodeId == null) {
            throw new RuntimeException("datanode in execute_plan is null, execute_plan is:\n" + executePlan);
        }

        return getDataNodeExecutor(dataNodeId);
    }

    private IExecutor getDataNodeExecutor(String dataNodeId) {

        IExecutor executor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutorManager().getDataNodeExecutor(dataNodeId);
        if (executor == null) {
            throw new ExecutorException("cannot find executor for datanode:" + dataNodeId + "\ngroupByList:\n" + DataNodeExecutorContext.getExecutorContext().getDataNodeExecutorManager());
        }

        return executor;
    }

    public ResultCursor commit(ExecuteContext executeContext) throws RuntimeException {

        return getDataNodeExecutor("DUAL_GROUP").commit(executeContext);
    }

    public ResultCursor rollback(ExecuteContext executeContext) throws RuntimeException {

        return getDataNodeExecutor("DUAL_GROUP").rollback(executeContext);
    }

    public Future<ResultCursor> commitWithFuture(final ExecuteContext executeContext) throws RuntimeException {

        final Map<Object, Object> threadContext = ThreadLocalMap.getContextMap();
        final Map mdcContext = MDC.getCopyOfContextMap();
        ExecutorService executorService = executeContext.getExecutorService();
        if (executorService == null) {
            throw new ExecutorException("concurrentExecutors is null, cannot  parallelly");
        }

        Future<ResultCursor> future = executorService.submit(new Callable<ResultCursor>() {

            public ResultCursor call() throws Exception {

                ThreadLocalMap.setContextMap(resetContext(threadContext));
                MDC.setContextMap(mdcContext);
                return commit(executeContext);
            }
        });
        return future;
    }

    public Future<ResultCursor> rollbackWithFuture(final ExecuteContext executeContext) throws RuntimeException {

        final Map<Object, Object> threadContext = ThreadLocalMap.getContextMap();
        final Map mdcContext = MDC.getCopyOfContextMap();
        ExecutorService executorService = executeContext.getExecutorService();
        if (executorService == null) {
            throw new ExecutorException("concurrentExecutors is null, cannot  parallelly");
        }

        Future<ResultCursor> future = executorService.submit(new Callable<ResultCursor>() {

            public ResultCursor call() throws Exception {

                ThreadLocalMap.setContextMap(resetContext(threadContext));
                MDC.setContextMap(mdcContext);
                return rollback(executeContext);
            }
        });
        return future;
    }

    public Future<List<ISortingCursor>> executeWithFuture(final ExecuteContext executeContext, final List<ExecutePlan> executePlanList) throws RuntimeException {

        final Map<Object, Object> threadContext = ThreadLocalMap.getContextMap();
        final Map mdcContext = MDC.getCopyOfContextMap();
        ExecutorService executorService = executeContext.getExecutorService();
        if (executorService == null) {
            throw new ExecutorException("concurrentExecutors is null, cannot parallelly");
        }

        Future<List<ISortingCursor>> future = executorService.submit(new Callable<List<ISortingCursor>>() {

            public List<ISortingCursor> call() throws Exception {
                List<ISortingCursor> cursors = new ArrayList(executePlanList.size());

                ThreadLocalMap.setContextMap(resetContext(threadContext));
                MDC.setContextMap(mdcContext);
                for (ExecutePlan executePlan : executePlanList) {
                    cursors.add(execute(executeContext, executePlan));
                }

                return cursors;
            }
        });
        return future;
    }

    public String getDataNode() {
        return dataNode;
    }

    public void setDataNode(String dataNode) {
        this.dataNode = dataNode;
    }

    private Map<Object, Object> resetContext(Map<Object, Object> threadContext) {
        Map<Object, Object> context = new HashMap(threadContext);
        if (context.containsKey(ThreadLocalString.TXC_CONTEXT_MANAGER) == true) {
            if (ThreadLocalMap.containsKey(ThreadLocalString.TXC_CONTEXT_MANAGER) == false) {
                context.remove(ThreadLocalString.TXC_CONTEXT_MANAGER);
                context.put(ThreadLocalString.TXC_CONTEXT_MANAGER, ThreadLocalString.TXC_MANAGER_NAME);
            }
        }
        return context;
    }
}
